package com.atguigu.day08;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink12_SQL_DDL_EventTime {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.获取表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.setString("table.local-time-zone", "GMT");


        //TODO 3.在DDL建表语句中指定事件时间
        tableEnv.executeSql("create table sensor(" +
                "id String," +
                "ts bigint," +
                "vc int," +
//                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                //1.13之后版本写法
                "t as to_timestamp_ltz(ts,3)," +
                "watermark for t as t - interval '5' second" +
                ") with (" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor-sql.txt'," +
                "'format' = 'csv'" +
                ")");
        tableEnv.executeSql("select * from sensor").print();
    }
}
